Source code for hysop.operator.directional.directional

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from abc import ABCMeta, abstractmethod
from hysop.tools.htypes import check_instance, to_list, first_not_None, to_set
from hysop.tools.decorators import debug, static_vars
from hysop.tools.sympy_utils import subscript
from hysop.constants import Implementation, DirectionLabels, TranspositionState
from hysop.core.graph.graph import generated
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.core.graph.computational_operator import ComputationalGraphNode


[docs] class DirectionalOperatorBase: """ Implementation interface for directional operators. """ @debug def __new__(cls, splitting_dim, splitting_direction, dt_coeff, **kwds): return super().__new__(cls, **kwds) @debug def __init__(self, splitting_dim, splitting_direction, dt_coeff, **kwds): """ Create a directional operator in a given direction. Parameters ---------- splitting_direction: int Direction of this operator. splitting_dim: The dimension of the splitting. dt_coeff: Coefficient that should be applied on simulation timestep. Attributes ---------- splitting_direction: int Direction of this operator. splitting_dim: int The dimension of the splitting. dt_coeff: float Coefficient that should be applied on simulation timestep. """ check_instance(splitting_dim, int) check_instance(splitting_direction, int) check_instance(dt_coeff, float) dim = splitting_dim direction = splitting_direction if (direction < 0) or (direction >= dim): msg = "Bad splitting direction '{}' for splitting dimension {}." msg = msg.format(direction, dim) raise ValueError(msg) assert dt_coeff > 0.0 self.splitting_dim = splitting_dim self.splitting_direction = splitting_direction self.dt_coeff = dt_coeff super().__init__(**kwds)
[docs] @debug def get_field_requirements(self): requirements = super().get_field_requirements() direction = self.splitting_direction dim = self.splitting_dim sdir = DirectionLabels[direction] axes = TranspositionState[dim].filter_axes( lambda axes: (axes[-1] == dim - 1 - direction) ) axes = tuple(axes) # might be problematic for hg dimensions for field, td, req in requirements.iter_input_requirements(): req.axes = axes for field, td, req in requirements.iter_output_requirements(): req.axes = axes return requirements
[docs] class DirectionalOperatorGeneratorI: def __new__(cls, **kwds): return super().__new__(cls, **kwds) def __init__(self, **kwds): super().__init__(**kwds) self._generated = False self._splitting_dim = None
[docs] @abstractmethod def generate_only_once_per_direction(self): pass
[docs] @debug def generate(self, splitting_dim, **kwds): self._splitting_dim = splitting_dim self._direction_counter = [ 0, ] * splitting_dim self._generated = True try: return super().generate(**kwds) except AttributeError: pass
[docs] @abstractmethod @generated def generate_direction(self, i, dt_coeff): if (i < 0) or (i >= self._splitting_dim): msg = "Requested direction is out of bounds (i={}, max_dir={})." msg = msg.format(i, self._splitting_dim) raise ValueError(msg) if self.generate_only_once_per_direction() and (self._direction_counter[i] > 0): return False self._direction_counter[i] += 1 return True
[docs] @generated def splitting_dimension(self): """ Returns the splitting dimension used to generate this DirectionalOperatorGenerator. """ return self._splitting_dim
[docs] @generated def get_direction(self, i, dt_coeff): """ Retrieve operator in direction i. """ return self.generate_direction(i, dt_coeff)
[docs] class DirectionalOperatorGenerator(DirectionalOperatorGeneratorI, metaclass=ABCMeta): """ Simple ComputationalGraphNodeGenerator to generate an operator in multiple directions. """ @debug def __new__( cls, operator, base_kwds, candidate_input_tensors, candidate_output_tensors, name=None, pretty_name=None, **op_kwds, ): return super().__new__(cls, **base_kwds) @debug def __init__( self, operator, base_kwds, candidate_input_tensors, candidate_output_tensors, name=None, pretty_name=None, **op_kwds, ): """ Initialize a DirectionalOperatorGenerator. Parameters ---------- operator: ComputationalGraphNodeGenerator or ComputationalGraphNode operator class to be built in each direction. operator.__init__ should accept all extra keywords arguments op_kwds + 'direction' + 'splitting_dim' + 'name' base_kwds: dict, optional, defaults to None Base class keywords arguments. If None, an empty dict will be passed. op_kwds: Keywords arguments that will be passed towards operator.__init__ during a call to _generate. In addition to those arguments, direction and splitting_dim will also be passed. candidate_input_tensors: tuple of fields, optional Input tensor fields that should be rebuilt at discretization. ScalarFields are filtered out. candidate_output_tensors: tuple of fields, optional Output tensor fields that should be rebuilt at discretization. ScalarFields are filtered out. """ super().__init__(**base_kwds) candidate_input_tensors = first_not_None(candidate_input_tensors, ()) candidate_output_tensors = first_not_None(candidate_output_tensors, ()) self.name = first_not_None(name, type(self).__name__) self.pretty_name = first_not_None(pretty_name, self.name) self.candidate_input_tensors = set( filter(lambda x: x.is_tensor, candidate_input_tensors) ) self.candidate_output_tensors = set( filter(lambda x: x.is_tensor, candidate_output_tensors) ) self._operator = operator self._op_kwds = op_kwds self._node_generators = None self._input_fields_to_dump = None self._output_fields_to_dump = None
[docs] def dump_inputs(self, fields=None, io_params=None, directions=None, **kwds): """ Tell this operator to dump some of its inputs before apply is called. Target folder, file, dump frequency and other io pameters are passed through io_params. """ assert self._generated is False if directions is not None: directions = to_list(directions) directions = to_set([int(d) for d in directions]) assert (io_params is None) or (directions is None) input_kwds = dict(io_params=io_params) input_kwds.update(kwds) self._input_fields_to_dump = (fields, directions, input_kwds)
[docs] def dump_outputs(self, fields=None, io_params=None, directions=None, **kwds): """ Tell this operator to dump some of its outputs after apply is called. Target folder, file, dump frequency and other io pameters are passed through instance io_params of this parameter. """ assert self._generated is False if directions is not None: directions = to_list(directions) directions = {int(d) for d in directions} assert (io_params is None) or (directions is None) output_kwds = dict(io_params=io_params) output_kwds.update(kwds) self._output_fields_to_dump = (fields, directions, output_kwds)
[docs] @debug def generate(self, splitting_dim, **extra_kwds): """ Generate splitting_dim operators in each direction. """ if self._generated: assert splitting_dim == self._splitting_dim return super().generate(splitting_dim=splitting_dim, **extra_kwds) op_kwds = extra_kwds.copy() op_kwds["splitting_dim"] = splitting_dim op_kwds.update(self._op_kwds) self._op_kwds = op_kwds
[docs] def generate_only_once_per_direction(self): return False
[docs] @generated def generate_direction(self, i, dt_coeff): should_generate = super().generate_direction(i=i, dt_coeff=dt_coeff) if "mpi_params" in self._op_kwds.keys(): should_generate = should_generate and self._op_kwds["mpi_params"].on_task if not should_generate: return () kwds = self._op_kwds basename = kwds.pop("name", self.name) basepname = kwds.pop("pretty_name", self.pretty_name) kargs = {} kargs.update(kwds) kargs.update(self.custom_directional_kwds(i)) name = f"{basename}_{DirectionLabels[i]}_{self._direction_counter[i]}" pname = "{}_{}{}".format( basepname, DirectionLabels[i], subscript(self._direction_counter[i]) ) try: op = self._operator( name=name, pretty_name=pname, splitting_direction=i, dt_coeff=dt_coeff, **kargs, ) except: sargs = [f"*{k} = {v.__class__}" for (k, v) in kargs.items()] msg = "FATAL ERROR during {}.generate():\n" msg += " => failed to initialize an instance of type {}" msg += "\n by using the following keyword arguments:" msg += "\n " + "\n ".join(sargs) msg = msg.format(self.__class__, self.implementation) print(f"\n{msg}") raise if self._input_fields_to_dump is not None: (fields, dirs, kwds) = self._input_fields_to_dump if (dirs is None) or (int(i) in dirs): if fields is not None: fields = set(fields).intersection(op.input_fields.keys()) else: fields = op.input_fields.keys() if fields: op.dump_inputs(fields=fields, **kwds) if self._output_fields_to_dump is not None: (fields, dirs, kwds) = self._output_fields_to_dump if (dirs is None) or (int(i) in dirs): if fields is not None: fields = set(fields).intersection(op.output_fields.keys()) else: fields = op.output_fields.keys() if fields: op.dump_outputs(fields=fields, **kwds) return op
[docs] def custom_directional_kwds(self, i): """Specify custom keyword arguments that will be passed only for direction i.""" return {}
[docs] class DirectionalOperatorFrontend(DirectionalOperatorGenerator, metaclass=ABCMeta): """ Frontend facility for directional operators that provide multiple implementations. """ @debug def __new__(cls, implementation=None, base_kwds=None, **op_kwds): base_kwds = {} if (base_kwds is None) else base_kwds return super().__new__(cls, operator=None, base_kwds=base_kwds, **op_kwds) @debug def __init__(self, implementation=None, base_kwds=None, **op_kwds): """ Initialize a DirectionalOperatorFrontend. Parameters ---------- implementation: Implementation, optional, defaults to None target implementation, should be contained in available_implementations(). If None, implementation will be set to default_implementation(). base_kwds: dict, optional, defaults to None Base class keywords arguments. If None, an empty dict will be passed. op_kwds: Keywords arguments that will be passed towards implementation implemention __init__ during a call to _generate. In addition to those arguments, direction and splitting_dim will also be passed. candidate_input_tensors: tuple of fields, optional Candidate input tensor fields that should be rebuilt at discretization. ScalarFields are filtered out. candidate_output_tensors: tuple of fields, optional Candidate output tensor fields that should be rebuilt at discretization. ScalarFields are filtered out. Attributes ---------- implementation: Implementation the implementation implementation """ base_kwds = {} if (base_kwds is None) else base_kwds check_instance(implementation, Implementation, allow_none=True) check_instance(base_kwds, dict, keys=str) operator = self._get_implementation(implementation) if not issubclass(operator, DirectionalOperatorBase): msg = "Class {} does not inherit from the directional operator interface " msg += "({})." msg = msg.format(operator.__class__, DirectionalOperatorBase) raise TypeError(msg) if not issubclass(operator, ComputationalGraphNode): msg = ( "Class {} does not inherit from the computational graph node interface" ) msg += "({})." msg = msg.format(operator.__class__, ComputationalGraphNode) raise TypeError(msg) super().__init__(operator=operator, base_kwds=base_kwds, **op_kwds) self.implementation = implementation def _get_implementation(self, implementation): """ Make some checks and return the operator implementation corresponding to chosen implementation. """ default_implementation = self.default_implementation() available_implementations = self.available_implementations() if not isinstance(default_implementation, Implementation): msg = "default_implementation is not a instance of hysop.implementation.Implementation." raise TypeError() for b in available_implementations: if not isinstance(b, Implementation): msg = "{} is not a instance of hysop.implementation.Implementation." msg = msg.format(b) raise TypeError(msg) if default_implementation not in available_implementations: msg = ( "default_implementation is not contained in available_implementations." ) raise ValueError() if implementation is None: implementation = default_implementation elif implementation not in available_implementations: simplementations = [] simplementations.append(f"-{default_implementation} (default)") for b in available_implementations: if b != default_implementation: simplementations.append(f"-{b}") msg = "\nSpecified implementation '{}' is not an available implementation\n" msg += "for operator {}, available implementations are:\n {}" msg = msg.format( implementation, self.__class__.__name__, "\n ".join(simplementations) ) raise ValueError(msg) return self.implementations()[implementation]
[docs] @abstractmethod def implementations(cls): """ Should return all implementations as a dictionnary. Keys are Implementation instances and values are either ComputationalGraphNode or ComputationalGraphNodeGenerator. """ pass
[docs] @abstractmethod def default_implementation(cls): """ Return the default Implementation, should be compatible with available_implementations. """ pass
[docs] @classmethod def available_implementations(cls): """ Return all available implementations. """ return cls.implementations().keys()
def _get_generated(self): return self._generated generated = property(_get_generated)